Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4 #8949

Closed
wants to merge 10 commits into from

Conversation

liyafan82
Copy link
Contributor

Support compressing/decompressing RecordBatch IPC buffers by LZ4.

@github-actions
Copy link

// first 8 bytes reserved for uncompressed length, to be consistent with the
// C++ implementation.
ArrowBuf compressedBuffer = allocator.buffer(maxCompressedLength + SIZE_OF_MESSAGE_LENGTH);
compressedBuffer.setLong(0, unCompressedBuffer.writerIndex());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to ensure this in little-endian? (c.f. https://github.com/apache/arrow/blob/master/cpp/src/arrow/ipc/reader.cc#L385).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Revised accordingly. Thanks for your kind reminder.

decompressor = factory.fastDecompressor();
}

long decompressedLength = compressedBuffer.getLong(0);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Revised. Thank you.

@kiszk
Copy link
Member

kiszk commented Jan 4, 2021

Looks good to me

import org.apache.arrow.memory.util.MemoryUtil;
import org.apache.arrow.util.Preconditions;

import net.jpountz.lz4.LZ4Compressor;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How was this library chosen? It looks like it might not have been released in a while?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My guess is that this import refers to this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kiszk You are right. I chose this library because our C++ implementation also depends on this repo (https://github.com/lz4/lz4).

}

@Override
public ArrowBuf compress(BufferAllocator allocator, ArrowBuf unCompressedBuffer) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public ArrowBuf compress(BufferAllocator allocator, ArrowBuf unCompressedBuffer) {
public ArrowBuf compress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

? or is this consistent with the existing API?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch. Thank you!

Preconditions.checkArgument(unCompressedBuffer.writerIndex() <= Integer.MAX_VALUE,
"The uncompressed buffer size exceeds the integer limit");

// create compressor lazily
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For some scenarios (e.g. flight sender), we only need the compressor, while for others (e.g. flight receiver), we only need the decompressor. So there is no need to create both eagerly.

@emkornfield
Copy link
Contributor

Is it possible to add a test to confirm that this can be read/written from the C++ implementation?

@HedgehogCode
Copy link
Contributor

HedgehogCode commented Jan 5, 2021

When I use the changes and try to compress and decompress an empty buffer (by using a variable sized vector with only missing values) I get a SIGSEGV (hs_err_pid10504.log):

#
# A fatal error has been detected by the Java Runtime Environment:
#
#  SIGSEGV (0xb) at pc=0x00007f1209448fd0, pid=10504, tid=0x00007f1208bc7700
#
# JRE version: OpenJDK Runtime Environment (8.0_275-b01) (build 1.8.0_275-b01)
# Java VM: OpenJDK 64-Bit Server VM (25.275-b01 mixed mode linux-amd64 compressed oops)
# Problematic frame:
# V  [libjvm.so+0x6fdfd0]  jni_ThrowNew+0xc0
#
# Core dump written. Default location: /workspaces/arrow/java/vector/core or core.10504

This can be reproduced by adding the following test to TestCompressionCodec.java:

  @Test
  public void testEmptyBuffer() throws Exception {
    final int vecLength = 10;
    final VarBinaryVector origVec = new VarBinaryVector("vec", allocator);

    origVec.allocateNew(vecLength);

    // Do not set any values (all missing)
    origVec.setValueCount(vecLength);

    final List<ArrowBuf> origBuffers = origVec.getFieldBuffers();
    final List<ArrowBuf> compressedBuffers = compressBuffers(origBuffers);
    final List<ArrowBuf> decompressedBuffers = deCompressBuffers(compressedBuffers);

    // TODO assert that the decompressed buffers are correct
    AutoCloseables.close(decompressedBuffers);
  }

This looks like an error in the lz4-java library but I am not sure. I thought I should mention it here first.
(Note that I am using OpenJDK 8 and I haven't tried OpenJDK 11 yet)

@liyafan82
Copy link
Contributor Author

Is it possible to add a test to confirm that this can be read/written from the C++ implementation?

@emkornfield I think it is a good idea to provide e2e cross-language integration tests.
However, I am not sure if we are ready now.

In particular, we need to change the way buffers are released after compressing.
Previously, we release the buffers by directly closing the related vectors. This no longer works now, as vector's buffers are released by the codec, and the compressed buffers need to be released properly.

Solution to this problem may have impacts to other parts of the code base. So maybe we need another issue to discuss it (if we do not do it in this PR).

@liyafan82
Copy link
Contributor Author

When I use the changes and try to compress and decompress an empty buffer (by using a variable sized vector with only missing values) I get a SIGSEGV (hs_err_pid10504.log):

#
# A fatal error has been detected by the Java Runtime Environment:
#
#  SIGSEGV (0xb) at pc=0x00007f1209448fd0, pid=10504, tid=0x00007f1208bc7700
#
# JRE version: OpenJDK Runtime Environment (8.0_275-b01) (build 1.8.0_275-b01)
# Java VM: OpenJDK 64-Bit Server VM (25.275-b01 mixed mode linux-amd64 compressed oops)
# Problematic frame:
# V  [libjvm.so+0x6fdfd0]  jni_ThrowNew+0xc0
#
# Core dump written. Default location: /workspaces/arrow/java/vector/core or core.10504

This can be reproduced by adding the following test to TestCompressionCodec.java:

  @Test
  public void testEmptyBuffer() throws Exception {
    final int vecLength = 10;
    final VarBinaryVector origVec = new VarBinaryVector("vec", allocator);

    origVec.allocateNew(vecLength);

    // Do not set any values (all missing)
    origVec.setValueCount(vecLength);

    final List<ArrowBuf> origBuffers = origVec.getFieldBuffers();
    final List<ArrowBuf> compressedBuffers = compressBuffers(origBuffers);
    final List<ArrowBuf> decompressedBuffers = deCompressBuffers(compressedBuffers);

    // TODO assert that the decompressed buffers are correct
    AutoCloseables.close(decompressedBuffers);
  }

This looks like an error in the lz4-java library but I am not sure. I thought I should mention it here first.
(Note that I am using OpenJDK 8 and I haven't tried OpenJDK 11 yet)

@HedgehogCode Thanks a lot for your effort and information. I will take a look at the problem.

@liyafan82
Copy link
Contributor Author

When I use the changes and try to compress and decompress an empty buffer (by using a variable sized vector with only missing values) I get a SIGSEGV (hs_err_pid10504.log):

#
# A fatal error has been detected by the Java Runtime Environment:
#
#  SIGSEGV (0xb) at pc=0x00007f1209448fd0, pid=10504, tid=0x00007f1208bc7700
#
# JRE version: OpenJDK Runtime Environment (8.0_275-b01) (build 1.8.0_275-b01)
# Java VM: OpenJDK 64-Bit Server VM (25.275-b01 mixed mode linux-amd64 compressed oops)
# Problematic frame:
# V  [libjvm.so+0x6fdfd0]  jni_ThrowNew+0xc0
#
# Core dump written. Default location: /workspaces/arrow/java/vector/core or core.10504

This can be reproduced by adding the following test to TestCompressionCodec.java:

  @Test
  public void testEmptyBuffer() throws Exception {
    final int vecLength = 10;
    final VarBinaryVector origVec = new VarBinaryVector("vec", allocator);

    origVec.allocateNew(vecLength);

    // Do not set any values (all missing)
    origVec.setValueCount(vecLength);

    final List<ArrowBuf> origBuffers = origVec.getFieldBuffers();
    final List<ArrowBuf> compressedBuffers = compressBuffers(origBuffers);
    final List<ArrowBuf> decompressedBuffers = deCompressBuffers(compressedBuffers);

    // TODO assert that the decompressed buffers are correct
    AutoCloseables.close(decompressedBuffers);
  }

This looks like an error in the lz4-java library but I am not sure. I thought I should mention it here first.
(Note that I am using OpenJDK 8 and I haven't tried OpenJDK 11 yet)

@HedgehogCode The problem happend when lz4-java tried to decompress an empty buffer. I have fixed the problem by taking special case of empty buffers. Thanks again for your kind reminder.

}

ByteBuffer compressed = MemoryUtil.directBuffer(
compressedBuffer.memoryAddress() + SIZE_OF_MESSAGE_LENGTH, (int) compressedBuffer.writerIndex());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: the capacity may be (int) (compressedBuffer.writerIndex() - SIZE_OF_MESSAGE_LENGTH)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch. Thank you @stczwd

@HedgehogCode
Copy link
Contributor

The comment in the BodyCompression protobuf states:

Each constituent buffer is first compressed with the indicated compressor, and then written with the uncompressed length in the first 8 bytes as a 64-bit little-endian signed integer followed by the compressed buffer bytes (and then padding as required by the protocol). The uncompressed length may be set to -1 to indicate that the data that follows is not compressed, which can be useful for cases where compression does not yield appreciable savings.

Should the check for a length of -1 be made outside of CompressionCodec implementation? I think it would be useful to do it in the #decompress(BufferAllocator, ArrowBuf) method.

Should be pretty easy if I don't miss something:

    if (decompressedLength == -1L) {
      // handle uncompressed buffers
      return compressedBuffer.slice(SIZE_OF_MESSAGE_LENGTH,
          compressedBuffer.writerIndex() - SIZE_OF_MESSAGE_LENGTH);
    }

@liyafan82
Copy link
Contributor Author

The comment in the BodyCompression protobuf states:

Each constituent buffer is first compressed with the indicated compressor, and then written with the uncompressed length in the first 8 bytes as a 64-bit little-endian signed integer followed by the compressed buffer bytes (and then padding as required by the protocol). The uncompressed length may be set to -1 to indicate that the data that follows is not compressed, which can be useful for cases where compression does not yield appreciable savings.

Should the check for a length of -1 be made outside of CompressionCodec implementation? I think it would be useful to do it in the #decompress(BufferAllocator, ArrowBuf) method.

Should be pretty easy if I don't miss something:

    if (decompressedLength == -1L) {
      // handle uncompressed buffers
      return compressedBuffer.slice(SIZE_OF_MESSAGE_LENGTH,
          compressedBuffer.writerIndex() - SIZE_OF_MESSAGE_LENGTH);
    }

@HedgehogCode Thanks for your good suggestion. I have revised the code to implement the logic that when the compressed buffer is larger, we directly send the raw buffer with length -1. In addition, I have updated the test cast to make sure the code path of the logic is covered.

@emkornfield
Copy link
Contributor

@liyafan82 per recent discussion on mailing list. I looked into it and the lz4 page mentioned https://commons.apache.org/proper/commons-compress/javadocs/api-release/org/apache/commons/compress/compressors/lz4/package-summary.html as a port, so that might offer better compatibiity as a library

@liyafan82
Copy link
Contributor Author

@liyafan82 per recent discussion on mailing list. I looked into it and the lz4 page mentioned https://commons.apache.org/proper/commons-compress/javadocs/api-release/org/apache/commons/compress/compressors/lz4/package-summary.html as a port, so that might offer better compatibiity as a library

@emkornfield Sounds reasonable. I will update the PR accordingly. Thanks for your good suggestion.

@pitrou
Copy link
Member

pitrou commented Feb 3, 2021

See PR #9408 for integration tests.

@liyafan82
Copy link
Contributor Author

Switched to the commons-compress library, according to @emkornfield's suggestion.

@emkornfield
Copy link
Contributor

@liyafan82 could you enable the java integration test to confirm that reading the files generated by C++ works before we merge (once we verify it is working I can take a final look)

@@ -74,6 +74,11 @@
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little hesitant to take a direct dependency on any lz4 library. Is there away that this can be done optionally (similar to how the netty dependency for memory has been isolated?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@emkornfield Sounds reasonable. I will try to revise the PR accordingly. Thanks for your good suggestion.

@liyafan82
Copy link
Contributor Author

@liyafan82 could you enable the java integration test to confirm that reading the files generated by C++ works before we merge (once we verify it is working I can take a final look)

Sure. I will do some tests for that.

@emkornfield
Copy link
Contributor

efore we merge (once we verify it is working I can take a final look)

Sure. I will do some tests for that.

To run tests it should be sufficient to unskip the Java implementation in archery.

@liyafan82
Copy link
Contributor Author

To avoid the direct dependency on the lz4 library, I have extracted the concrete compression codec implementations to a separate module. Will continue to work on the integration tests.

@emkornfield
Copy link
Contributor

@emkornfield Another (maybe ugly) solution that comes to my mind is to use different libraries for buffers with dependent and independent blocks?

No, I think we should figure out a way to have on implementation.

@emkornfield
Copy link
Contributor

@liyafan82 let me know when you think this is ready for re-review. I think like I said I think getting a baseline working so we can do the follow-up work makes the most sense here.

@liyafan82
Copy link
Contributor Author

@liyafan82 let me know when you think this is ready for re-review. I think like I said I think getting a baseline working so we can do the follow-up work makes the most sense here.

@emkornfield Sorry for my delay. I am a little busy these days. I will try my best to make it ready in one or two days.

@emkornfield
Copy link
Contributor

No, rush just wanted to make sure I knew when it was ready for another pass.

@liyafan82 liyafan82 force-pushed the fly_1211_comp branch 8 times, most recently from 8aab6b5 to b28986c Compare March 11, 2021 09:15
@liyafan82
Copy link
Contributor Author

@emkornfield I have replied to each of the previous comment. So maybe it is ready for a new round of review. Thanks.

Copy link
Contributor

@emkornfield emkornfield left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank @liyafan82 a few more minor comments. I'd like to see this merged sooner rather then later so we can do the follow-up work. If you don't have bandwidth please let me know, and if it OK I can fixup my comments and push to this PR?

<version>1.20</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, wonder why netty is required here though, I'll take a closer look.

}

/**
* Process decompression by decompressing the buffer as is.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please update the docs to match, something like.

"Slice the buffer to contain the uncompressed bytes"

import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorOutputStream;
import org.apache.commons.compress.utils.IOUtils;

import io.netty.util.internal.PlatformDependent;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ahh this is where netty is used. we don't have an arrow wrapper for it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess no for now. Maybe we can have one in the future, so we can remove the dependency on Netty (and other dependencies on Netty as well).


@Override
public String getCodecName() {
return CompressionType.name(CompressionType.LZ4_FRAME);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the new enum, maybe we can make this an accessor that returns and enum instead? and then the byte can be extracted from there where necesssary?

@liyafan82
Copy link
Contributor Author

Thank @liyafan82 a few more minor comments. I'd like to see this merged sooner rather then later so we can do the follow-up work. If you don't have bandwidth please let me know, and if it OK I can fixup my comments and push to this PR?

@emkornfield Thanks a lot for the further comments. I think I can fix them up today.

@liyafan82
Copy link
Contributor Author

With the new enum, maybe we can make this an accessor that returns and enum instead? and then the byte can be extracted from there where necesssary?

Sounds good. I have revised the code accordingly.

@liyafan82
Copy link
Contributor Author

please update the docs to match, something like.

"Slice the buffer to contain the uncompressed bytes"

Updated. Thank you.

@emkornfield
Copy link
Contributor

+1 thank you. @liyafan82 did you have plans to work on the follow-up items or ZSTD? Otherwise I can take them up.

@HedgehogCode any thoughts on how to procede for LZ4? We can maybe discuss more on the performance JIRA?

@liyafan82
Copy link
Contributor Author

+1 thank you. @liyafan82 did you have plans to work on the follow-up items or ZSTD? Otherwise I can take them up.

@HedgehogCode any thoughts on how to procede for LZ4? We can maybe discuss more on the performance JIRA?

@emkornfield Thanks a lot for your effort.

I have started working on ARROW-11899 yesterday.
If you are interested in any of the items (including ARROW-11899), please feel free to assign them to yourself. I'd like to help with the review/discussions :-)

@emkornfield
Copy link
Contributor

If you've already started ARROW-11899 then I'll let you finish it up, hopefully it isn't too much work. We are discussing on the ML the path forward for LZ4 in general, once that is cleared up we can figure out do the work including if @HedgehogCode is interesting in contributing.

@liyafan82
Copy link
Contributor Author

If you've already started ARROW-11899 then I'll let you finish it up, hopefully it isn't too much work. We are discussing on the ML the path forward for LZ4 in general, once that is cleared up we can figure out do the work including if @HedgehogCode is interesting in contributing.

Sounds good. Hopefully, I will prepare a PR in a few days.

@pitrou
Copy link
Member

pitrou commented Mar 22, 2021

@liyafan82 @emkornfield Can you one of you update https://github.com/apache/arrow/blob/master/docs/source/status.rst#ipc-format once this is all finished?

@liyafan82
Copy link
Contributor Author

@liyafan82 @emkornfield Can you one of you update https://github.com/apache/arrow/blob/master/docs/source/status.rst#ipc-format once this is all finished?

@pitrou I will keep this in mind. Thanks for your kind reminder.

GeorgeAp pushed a commit to sirensolutions/arrow that referenced this pull request Jun 7, 2021
Support compressing/decompressing RecordBatch IPC buffers by LZ4.

Closes apache#8949 from liyafan82/fly_1211_comp

Authored-by: liyafan82 <fan_li_ya@foxmail.com>
Signed-off-by: Micah Kornfield <emkornfield@gmail.com>
michalursa pushed a commit to michalursa/arrow that referenced this pull request Jun 13, 2021
Support compressing/decompressing RecordBatch IPC buffers by LZ4.

Closes apache#8949 from liyafan82/fly_1211_comp

Authored-by: liyafan82 <fan_li_ya@foxmail.com>
Signed-off-by: Micah Kornfield <emkornfield@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

8 participants